/*
 * TMediaCasterConnection.cpp
 *
 *  Created on: 2016年7月14日
 *      Author: terry
 */

#include "TMediaCasterConnection.h"
#include "TMediaCasterServer.h"
#include "CLog.h"
#include "TStringUtil.h"
#include "Socket.h"
#include "MediaCasterTable.h"
#include <assert.h>
#include <StopWatch.h>
#include "MediaTransportHelper.h"
#include "MediaCasterConfig.h"


namespace av
{

TMediaCasterConnection::TMediaCasterConnection():
		m_server(),
		m_header(),
		m_headerReady()
{
	m_buffer.ensure(1024*500);
}

TMediaCasterConnection::~TMediaCasterConnection()
{
}

bool TMediaCasterConnection::open(comn::Socket& sock, comn::SockAddr& addr, TMediaCasterServer* server)
{
	if (!sock.isOpen())
	{
		return false;
	}

	m_socket = sock;
	m_addr = addr;
	m_server = server;

	int value = TRUE;
    int ret = ::setsockopt(m_socket.getHandle(), IPPROTO_TCP, TCP_NODELAY, (char*)&value, sizeof(value));

    if (MediaCasterConfig::s_sendBufSize > 0)
    {
        sock.setSendBufferSize(MediaCasterConfig::s_sendBufSize);
    }

	return comn::Thread::start();
}

void TMediaCasterConnection::close()
{
	if (isRunning())
	{
		comn::Thread::stop();
	}

	m_socket.close();
}

bool TMediaCasterConnection::isOpen()
{
	return m_socket.isOpen() && isRunning();
}

std::string	TMediaCasterConnection::getStreamName() const
{
	return m_streamName;
}

socket_t TMediaCasterConnection::getSocket() const
{
	return m_socket.getHandle();
}

comn::SockAddr TMediaCasterConnection::getPeerName() const
{
	return m_addr;
}

void TMediaCasterConnection::onMediaFormat(const MediaFormat& fmt)
{
	m_format = fmt;
}

void TMediaCasterConnection::onMediaPacket(MediaPacketPtr& pkt)
{
	m_pktQueue.push(pkt);
}

void TMediaCasterConnection::onMediaEvent(int event)
{
	// pass
}

int TMediaCasterConnection::run()
{
	if (!checkRead(1000*5) || !handleCommand())
	{
		handleClose();
		return 0;
	}

	while (!m_canExit)
	{
		MediaPacketPtr pkt;
		m_pktQueue.pop(pkt, -1);

		if (!pkt)
		{
			continue;
		}

		int64_t duration = m_pktQueue.getDuration();
		if (duration > MediaCasterConfig::s_queueDuration)
		{
			size_t count = m_pktQueue.dropUntilKeyFrame();
			CLog::warning("que duration is too much. duration:%d, drop:%d\n", (int)duration, count);
		}

		if (!sendPacket(pkt))
        {
            break;
        }
	}

    handleClose();

	return 0;
}

void TMediaCasterConnection::doStop()
{
	m_pktQueue.cancelWait();

	if (m_socket.isOpen())
	{
		m_socket.close();
	}
}



bool TMediaCasterConnection::checkRead(long ms)
{
	return m_socket.checkReadAndClose(ms) > 0;
}

bool TMediaCasterConnection::handleCommand()
{
	int length = m_socket.receive((char*)&m_header, sizeof(m_header), 0);
	if (length != sizeof(m_header))
	{
		return false;
	}

	m_buffer.ensure(m_header.length);

	length = m_socket.receive((char*)m_buffer.data(), m_header.length, 0);
	if (length != m_header.length)
	{
		return false;
	}

    std::string json(m_buffer.c_str(), m_header.length);
    Json::Value req;
    Json::Reader reader;
    if (!reader.parse(json, req))
    {
        return false;
    }

    std::string name = req["name"].asString();

	CMediaCasterPtr caster;
	MediaCasterTable::instance().find(name, caster);
	//if (!caster)
	{
		TMediaCasterConnectionPtr conn = shared_from_this();
		m_server->onAcquireStream(name, conn);

		MediaCasterTable::instance().find(name, caster);
	}

	bool done = false;
	Json::Value resp;

	if (caster)
	{
		done = true;

		m_streamName = name;

		av::MediaSinkPtr sink = shared_from_this();
		caster->addSink(sink);

		av::MediaFormat fmt;
		caster->getFormat(fmt);

		MediaTransportHelper::toJson(fmt, resp);
	}
	else
	{
		MediaTransportHelper::makeError(resp, ENOENT, "no such stream");
	}

	std::string text = comn::StringCast::toString(resp);

	MediaTransportHeader header;
	if (done)
	{
		MediaTransport::setCommand(header, av::kMediaCmdResp);
	}
	else
	{
		MediaTransport::setCommand(header, av::kMediaCmdError);
	}

	header.length = text.size();

	m_buffer.write((char*)&header, sizeof(header));
	m_buffer.write(text.c_str(), text.size());

	m_socket.send((char*)m_buffer.data(), m_buffer.length(), 0);

	return done;
}

void TMediaCasterConnection::handleClose()
{
	TMediaCasterConnectionPtr conn = shared_from_this();
	m_server->remove(conn);
}

bool TMediaCasterConnection::sendPacket(MediaPacketPtr& pkt)
{
	MediaTransportHeader header;
	memset(&header, 0, sizeof(header));
	header.magic = MediaTransport::MAGIC;
	header.tag = kMediaData;
    header.subtype = pkt->type;
    header.length = pkt->size;
    header.timestamp = pkt->pts;
    header.duration = pkt->duration;
    header.flags = pkt->flags;

    m_buffer.clear();
	m_buffer.write((char*)&header, sizeof(header));
	m_buffer.write(pkt->data, pkt->size);

    int ret = m_socket.send((char*)m_buffer.data(), m_buffer.length());

    return ret > 0;
}


} /* namespace av */
